java 9的時候新增支援Reactive Stream,所以在介紹Spring Reactor、WebFlux之前先來認識一下Java 原生的Flow Api。
java 9更新推出對應Reactive Streams 規格的分別有三個interface,Publisher負責產生item來推送給一個或多個Subscribers去使用(consumed),每一個item都被Subscription所管理,中間有一或多個Processor來處理轉換或是特別的邏輯。

Subscriber訂閱Publisher,Publisher負責產生推送資料,透過Subscription 居中管理, Subscriber會提出需要的資料量,這樣就有背壓(backpressure)機制(之後介紹),來避免Subscriber來不及消化大量資料導致系統異常。下方是java.util.concurrent.Flow 移除掉文件說明的部分,從程式碼可以看出
Publisher提供訂閱Subscriber 有四個方法 分別為
Subscription
Publisher結束時/*
public final class Flow {
private Flow() {} // uninstantiable
@FunctionalInterface
public static interface Publisher<T> {
public void subscribe(Subscriber<? super T> subscriber);
}
public static interface Subscriber<T> {
public void onSubscribe(Subscription subscription);
public void onNext(T item);
public void onError(Throwable throwable);
public void onComplete();
}
public static interface Subscription {
public void request(long n);
public void cancel();
}
}
實際動手做一個簡單聽Podcast要取得節目內容的DEMO
建立Member會員實作Subscriber,假設會員要訂閱Podcast頻道,訂閱時同步subscription並要求一個item,每接受到一個item就存入episodes 並要求下一個,最後當Publisher結束時則會印出onComplete。
@Data
public class Member <T> implements Subscriber<T> {
private Subscription subscription;
private List<T> episodes = new LinkedList<>();
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(T item) {
System.out.println("onNext:" + item);
episodes.add(item);
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
System.out.println("onError:" + throwable);
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
}
因為Publisher比較複雜我們直接拿原生的SubmissionPublisher來使用,實際DEMO情況如下
新增一個會員跟一個Podcast頻道並訂閱,將內容推送到頻道當中,在第一時間印出是可以發現尚未收到任何集數,因為整個流程是非同步的在背後進行,所以當我們暫停一秒鐘後就可以正常到看到結果。
@Test
void testJava9Reactive() throws InterruptedException {
Member<String> member = new Member<>();
SubmissionPublisher<String> podcastChannel = new SubmissionPublisher<>();
podcastChannel.subscribe(member);
Assertions.assertEquals(1, podcastChannel.getSubscribers().size());
List<String> episodes = List.of("1", "2", "3", "4");
episodes.forEach(podcastChannel::submit);
System.out.println(member.getEpisodes());
Thread.sleep(1000);
podcastChannel.close();
System.out.println(member.getEpisodes());
/* output:
[]
onNext:1
onNext:2
onNext:3
onNext:4
[1, 2, 3, 4]
onComplete
*/
}
今天終於開始進入到Reactive Programming並帶有一點實作,下一篇會繼續介紹今天用到的SubmissionPublisher與一開始提到的Processor。